package com.o2o.cleaning.month.platform.ebusiness_plat.intime

import com.alibaba.fastjson.{JSON, JSONObject}
import com.mongodb.spark.MongoSpark
import com.mongodb.spark.config.ReadConfig
import com.mongodb.spark.rdd.MongoRDD
import org.apache.spark.SparkContext
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import org.bson.Document
import org.elasticsearch.spark._

object CheckDataDetail {


  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder()
      .appName("CheckDataDetail")
      .config("spark.debug.maxToStringFields", "2000")
      .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
      .config("spark.sql.caseSensitive", "true")
      .config("es.nodes", "192.168.1.29")
      .config("es.port", "9200")
      .config("cluster.name", "O2OElastic")
      .config("es.net.http.auth.user", "elastic")
      .config("es.net.http.auth.pass", "changeme")
      .master("local[*]")
      .getOrCreate()

    val sc = spark.sparkContext
    sc.hadoopConfiguration.set("fs.s3a.access.key", "GAO7EO9FWKPJ8WFCQDME")
    sc.hadoopConfiguration.set("fs.s3a.secret.key", "LZ0xaHBSYKHaJ9ECDbX9f7zin79UZkXfGoNapRPL")
    sc.hadoopConfiguration.set("fs.s3a.endpoint", "https://obs.cn-north-1.myhuaweicloud.com")
    sc.setLogLevel("WARN")

    val year = 2021
    val month = 12
    val platformName = "intime"
    val collection = "intime_2111"
    val sourcePath = s"s3a://o2o-sourcedata-${year}/obs-source-${year}/${month}/${platformName}/${platformName}_${year}_${month}"
    val resultPath = s"s3a://o2o-dataproces-group/zyf/2021/${month}/${platformName}/good/"
    val readUri = "mongodb://root:O2Odata123!@ 192.168.0.149:27017/admin"
    //    spark.read.json(sourcePath)
    //    spark.read.orc(resultPath)
    //    spark.read.json(s"s3a://o2o-dimension-table/address_table/address_table_${year}/${month}/address_platform/${platformName}_address_${year}_${month}/*")
    //      .show(false)
    //      .registerTempTable("t1")
    //    spark.sql(
    //      """
    //        |select * from t1 where shopId = '60000605651'
    //        |""".stripMargin).show(false)

    // 读取MongoDB数据
    val dataRDD: RDD[String] = loadMongoData(spark, readUri, "Intime", "intime_id_2205")
    spark.read.json(dataRDD).registerTempTable("intime_id_2205")
    val dataid: RDD[String] = loadMongoData(spark, readUri, "Intime", "intime_shop_2")
    spark.read.json(dataid).registerTempTable("intime_shop_2")
    spark.sql(
      """
        |select distinct *
        |from (
        |    select mallId,mallName from intime_id_2205
        |    where storeId in ('73149092','1000000050012','1000000050993','60006330652','1000000051643','60052130650','1000000051228','1000000049158','1000000050949','60028240650','60006565650','60033130650','60006195651','82531114','60055525666','1000000052469','1000000050707','1000000050008','60050450652','60026990650','1000000050880','60038995653','60024080651','86753370','60006525652','86768412','60003570654','86742345','79447073','1000000050049','60045925653','1000000052681','1000000051232','79452048','60049780650','1000000051642','60005775653','1000000051368','1000000050734','60045090650','60006365652','60052555650','1000000050836','60038080663','60040130652','60001765676','1000000052498','60008035651','1000000052719','60046485650','60000007206','60001880657','60007170655','60006040650','1000000050472','60006220654','1000000050886','1000000050053','1000000051273','60039945654','1000000052005','1000000051958','60057835650','86767348','1000000052933','1000000052204','1000000052267','1000000049706','1000000050911','1000000052396','1000000050935','60001890661','60006310657','60027150656','60023380657','1000000050640','1000000046437','60001815651','1000000050708','60006450652','60003345668','1000000052205','60022375650','60040225651','1000000051956','1000000000274','60037590650','1000000049355','1000000046437','60014542194','60006155651','60021860651','1000000052042','60002345667','1000000052512','1000000050471','60042170655','60057460650','1000000052617','1000000051478','60032645651','60023345654','60038305653','1000000050772','1000000050915','1000000049605','1000000050831','1000000052170','1000000051950','60000005526','60036295651','1000000052026','1000000049591','1000000051783','1000000051671','60001765659','60006480651','60040270650','60006545650','60001890656','60006500652','60019645650','70805132','1000000051227','1000000052526','1000000043006','1000000052457','60023555652','60005180651','60006305662','79448057','60038430651','1000000052459','79825026','60050840650','60043780651','1000000050542','1000000052350','60006505650','1000000050397','60052910657','1000000052342','1000000050365','1000000051636','60006340654','60001885651','1000000052415','60008010653','60037950650','1000000051427','1000000052004','60049920650','1000000051037','79451047','1000000050027','60006535650','60001765661','60026325650','1000000050877','1000000049598','1000000051592','60006150651','1000000049596','1000000050832','60006305655','60000018303','60044035654','1000000044048','60055330651','60052075650','1000000050812','1000000052957','1000000050491','1000000051181','60026785653','60006525654','1000000050889','60044060657','60006510650','1000000052009','1000000051035','1000000050823','1000000052075','1000000050374','60052625650','1000000051709','60051830650','60024120650','60057300650','60018270652','1000000044670','1000000050151','1000000050147','60006365651','111855307','1000000050623','1000000049933','1000000052404','60040575650','1000000050158','60001875676','60006035652','1000000051169','60006510655','1000000053203','60051155653','1000000049664','1000000051595','1000000049670','60001880659','1000000051249','1000000049363','1000000051091','1000000049597','60036645650','60030120652','1000000051036','60000005525','60035110650','1000000053992','1000000049892','1000000052460','60036425651','60001765656','60005745652','60006350652','60045370651','1000000052001','1000000051562','1000000050078','1000000051417','60051830650','1000000050581','79443119','1000000050493','1000000050888','60039945653','60017600652','60006335651','1000000050622','60006375650','60023730655','60024190651','60006315656','82789337','1000000052503','60007145651','1000000051542','60006305660','1000000051984','1000000052151','60005180651','79437387','1000000053992','1000000050714','60006440651','1000000051736','60001890653','1000000050011','1000000049774','1000000042709','1000000049665','1000000052698','1000000049745','1000000050637','60006405653','1000000050407','1000000044048','1000000053129','1000000051693','60001765653','60039360657','60058455650','60024190651','60014543437','60000018303','1000000049599','60000005700','60014543444','60056070650','60038080660','60054330650','1000000050403','60006455650','1000000050954','1000000050881','1000000051022','60047370650','1000000051407','1000000051657','91687399','1000000049940','1000000052418','1000000052502','60035405650','60024120650','1000000052082','1000000050744','1000000051881','60006410650','1000000054951','1000000049600','1000000050054','1000000054434','60024440651','1000000050882','1000000052666','60006430653','1000000050652','1000000052876','1000000052458','60055625650','60006450651','60053385650','60001890654','1000000050174','1000000044292','60038745650','1000000051041','60006505651','1000000051170','1000000051934','1000000050782','1000000051295','1000000051231','1000000051092','60006460651','60018805650','60006310652','1000000050783','60059545651','60004595677','60006195650','60006515651','1000000050399','1000000051115','1000000052721','79441194','1000000051641','60006215652','1000000052484','1000000049592','60006520652','1000000051356','1000000051941','1000000052722','60040610650','1000000050245','1000000050840','1000000052575','60026180650','60001765655','60050070650','1000000050594','86745368','1000000051305','1000000049971','60004595677','1000000049722','1000000051882','1000000054052','1000000049939','60013500650','1000000049927','1000000051251','1000000054822','60001765675','60054345650','1000000052667','60034680651','1000000050610','60019540650','60006355653','1000000050246','60006405651','60054480650','60038305652','1000000051039','1000000052800','1000000053172','60038400651','1000000050488','1000000042957','86755350','1000000050609','1000000048638','1000000050160','1000000052905','60036945652','1000000042467','1000000051086','60005785650','60032645651','60006520655','60006470650','60006340651','1000000050885','1000000051084','60055740651','60026465650','60006465651','1000000050839','1000000051329','60051105651','1000000051482','60001890659','60022375650','86767349','60006305659','60023350660','1000000050775','1000000050165','1000000051311','1000000052345','1000000050402','1000000052248','1000000052074','60035625654','70805132','1000000051053','60006515650','60050070650','86751334','73149092','60025800650','1000000053494','1000000043325','1000000049597','1000000050401','1000000052449','1000000052388','1000000051885','1000000050639','60006475655','1000000051171','1000000050952','60042390667','1000000049605','60050950653','1000000053255','60006370651','1000000052013','60003345668','60006315654','86769387','60006355652','1000000055111','79441194','60042165652','60059090650','1000000050682','86740393','60021995650','60030395659','60038805650','79442122','1000000052417','60006220655','60034240651','60006835650','60037655655','86746370','1000000051089','1000000050244','60001765654','60054135671','60006520654','1000000050582','1000000052003')
        |    group by storeId,mallId,mallName
        |    ) t1 left join(
        |    select shopId,address from intime_shop_2 group by shopId,address
        |    ) t2 on t1.mallId = t2.shopId
        |""".stripMargin)
      .show(false)
    //    .repartition(1).write.option("header","true").csv("D://test")

  }

  def fun(spark: SparkSession, sc: SparkContext, year: Int, month: Int) {
    val index = s"${year}_intime/intime_${year}_${month}"
    println("----------" + index + "----------")
    val data = sc.esJsonRDD(index).values
    spark.read.json(data).repartition(1).write.orc(s"s3a://o2o-dataproces-group/zyf/intime/essource/${year}/${month}/")
  }

  def orcToJson(spark: SparkSession, sc: SparkContext, year: Int, month: Int): Unit = {
    println(s"${year}---${month}")
    val frame = spark.read.json(s"s3a://o2o-sourcedata-${year}/obs-source-${year}/${month}/intime/intime_${year}_${month}/")
      .repartition(1).write.orc(s"s3a://o2o-dataproces-group/zyf/intime/mongosource/${year}/${month}/")
  }

  def loadMongoData(spark: SparkSession, readUri: String, readDatabase: String, readCollection: String): RDD[String] = {

    val readConfig = ReadConfig(Map("uri" -> readUri, "database" -> readDatabase, "collection" -> readCollection))

    val mongoRDD: MongoRDD[Document] = MongoSpark.load(spark.sparkContext, readConfig)

    val rdd: RDD[String] = mongoRDD.map(line => {
      val nObject: JSONObject = JSON.parseObject(line.toJson())
      nObject.remove("_id")
      nObject.toString
    })

    rdd
  }
}
